package com.jhf.youke.pulsar.domain.service;

import lombok.extern.log4j.Log4j2;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;


/**
 * @author : rhj
 * @version : 1.0
 */
@Log4j2
@Component
public class ProducerService {

    @Resource
    private ProducerClient producerClient;

    private Producer<String> producer;

    public ProducerService(){

    }

    public ProducerService(String topic) throws PulsarClientException {
//        client = new ProducerClient();
        producer = createProducer(topic);
    }

    private Producer<String> createProducer(String topic) throws PulsarClientException {
        return producerClient.getPulsarClient().newProducer(Schema.STRING)
                .topic(topic)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .sendTimeout(10, TimeUnit.SECONDS)
                .blockIfQueueFull(true)
                .create();
    }

    public void sendMessage(String topic, String message) throws PulsarClientException{
        producerClient.init();
        producer = createProducer(topic);
        producer.sendAsync(message).thenAccept(msgId -> {
            log.info("Message with ID %s successfully sent {}", msgId);
        });
    }

    public void sendOnce(String message) {
        /**
         * 发送一次就关闭
         */
        try {
            producer.send(message);
            log.info("Message with content %s successfully sent {}", message);
            producer.close();
            producerClient.close();
        } catch (PulsarClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }


    public void close(Producer<byte[]> producer){
        producer.closeAsync()
                .thenRun(() -> log.info("Producer closed"));
    }


    public static void main(String[] args) throws PulsarClientException {
        ProducerService producer = new ProducerService("topic1");
        producer.sendOnce("Hello World 1");

    }


}
